import com.gfscold.trans.common.app.BaseSQLApp;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class DwdGPSDevice extends BaseSQLApp {

    @Override
    public void handle(StreamTableEnvironment tableEnv, StreamExecutionEnvironment env, String groupId) {
        tableEnv.executeSql("CREATE TABLE IF NOT EXISTS gtd_device_gps_data (\n" +
                "    `id` BIGINT NOT NULL,\n" +
                "    `device_code` STRING COMMENT '设备编号',\n" +
                "    `device_name` STRING COMMENT '设备名称',\n" +
                "    `vehicle_number` STRING COMMENT '车牌号',\n" +
                "    `province` STRING COMMENT '省',\n" +
                "    `city` STRING COMMENT '市',\n" +
                "    `area` STRING COMMENT '区',\n" +
                "    `position_name` STRING COMMENT '位置名称',\n" +
                "    `road_name` STRING COMMENT '道路名称',\n" +
                "    `direction` FLOAT COMMENT '方向',\n" +
                "    `gps_time` TIMESTAMP COMMENT 'GPS时间'," +
                "     et AS TO_TIMESTAMP(DATE_FORMAT(gps_time, 'yyyy-MM-dd HH:mm:ss')),\n" +
                "    `longitude1` DECIMAL(9,6) COMMENT '经度1',\n" +
                "    `latitude1` DECIMAL(9,6) COMMENT '纬度1',\n" +
                "    `longitude2` DECIMAL(9,6) COMMENT '经度2',\n" +
                "    `latitude2` DECIMAL(9,6) COMMENT '纬度2',\n" +
                "    `longitude3` DECIMAL(9,6) COMMENT '经度3',\n" +
                "    `latitude3` DECIMAL(9,6) COMMENT '纬度3',\n" +
                "    `longitude4` DECIMAL(9,6) COMMENT '经度4',\n" +
                "    `latitude4` DECIMAL(9,6) COMMENT '纬度4',\n" +
                "    `speed` FLOAT COMMENT '速度',\n" +
                "    `create_time` TIMESTAMP COMMENT '创建时间',\n" +
                "    WATERMARK FOR et AS et - INTERVAL '30' SECOND,\n" +
                "    PRIMARY KEY ( `id` ) NOT ENFORCED\n" +
                ") WITH (\n" +
                "     'connector' = 'mysql-cdc'\n" +
                "    ,'hostname' = 'heukrftr7x7cmlu6no4g.rwlb.rds.aliyuncs.com'\n" +
                "    ,'port' = '3306'\n" +
                "    ,'username' = 'zhhuang4'\n" +
                "    ,'password' = 'HZfR1cqrRcZC^'\n" +
                "    ,'scan.incremental.snapshot.enabled' = 'true'\n" +
                "    ,'debezium.snapshot.mode'='initial'  \n" +
                "    ,'database-name' = 'gfs_tms_device'\n" +
                "    ,'table-name' = 'gtd_device_gps_data_.*')\n"
                );
        tableEnv.executeSql("select\n" +
                "* \n" +
                "from \n" +
                "(\n" +
                "    select \n" +
                "            *\n" +
                "            ,ROW_NUMBER() OVER(PARTITION BY  window_start, window_end, device_code ORDER BY et desc)   rn\n" +
                "    from table(\n" +
                "        TUMBLE(table `gtd_device_gps_data`, descriptor(et), INTERVAL '1' MINUTE )\n" +
                "    )\n" +
                ")\n" +
                "where rn = 1").print();

    }

    public static void main(String[] args) {
        new DwdGPSDevice().start(9999, 1, "dwd_gps_device:0.0.0.0:9000");
    }
}
/*
select *
from
(
  select
    *
    , ROW_NUMBER() OVER(PARTITION BY  window_start, window_end, device_code ORDER BY event_time desc)   rn
  from TABLE (
    TUMBLE(table ods_gtd_device_temp_data, descriptor(event_time), INTERVAL '1' MINUTE)
  )
)
where rn = 1
;
 */